-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6144: option to query restoring and standby #7962
KAFKA-6144: option to query restoring and standby #7962
Conversation
@vinothchandar & @brary, This is the second chunk from your PR #7868, which I reviewed and feel good about merging. I'm sending a message to the mailing list for KIP-535, since this PR reveals some small changes to the public API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall.. Bunch of cosmetic suggestions. but one clarification on correctness/test
* | ||
* @param storeName name of the store to find | ||
* @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)} | ||
* @param queryStaleData If false, only permit queries on the leader replica for a partition, and only if the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use active
instead of leader
to stay consistent with streams terminology?
* | ||
* @param storeProvider provides access to all the underlying StateStore instances | ||
* @param storeName The name of the Store | ||
* @param queryStaleState Whether to allow querying recovering and standby stores. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recovering
-> restoring
? (recovering talks about task state and not store state?)
*/ | ||
public <T> T store(final String storeName, | ||
final QueryableStoreType<T> queryableStoreType, | ||
final boolean queryStaleData) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
standardize on queryStaleState
vs queryStaleData
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops. slipped past me.
* @param storeName name of the store | ||
* @param queryableStoreType accept stores passing {@link QueryableStoreType#accepts(StateStore)} | ||
* @param <T> The expected type of the returned store | ||
* @param includeStandbyAndRecovering if true, include standbys and recovering stores; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason to swtich to includeStandbyAndRecovering
? could we keep queryStaleState
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had been thinking about it as a directive internally about which stores to bundle, versus an API statement about which consistency level to query at, but upon reflection, even the API is just getting a store, not actually doing a query. I'm standardizing both terms to be the same.
} else { | ||
stores.add((T) store); | ||
final StreamThread.State state = streamThread.state(); | ||
if (includeStandbyAndRecovering ? state.isRunning() : state == StreamThread.State.RUNNING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is creative :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state.isRunning() is as below
public boolean isRunning() {
return equals(RUNNING) || equals(STARTING) || equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED);
}
if its not too much work, we can rename to something like state.isAlive()
, that captures what we want to check .. your call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case where includeStandbyAndRecovering=false
and the state is RUNNING
.. We still want to disallow queries on standbys? from code below, it seems like we will enter the if block and loop over all active and standby tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really nice catch! Thanks.
Regarding isRunning, I had a similar thought, but couldn't remember if StreamThread.State is exposed in the public API. After some code analysis, I don't think it is, so I've renamed it.
|
||
waitForApplicationState(streamsList.subList(1, numThreads), State.NOT_RUNNING, Duration.ofSeconds(60)); | ||
|
||
// Now, confirm that all the keys are still queryable on the remaining thread, regardless of the state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To close out the earlier thread.. This test is okay, since NOT_RUNNING
will make that instance go to DEAD state (or some non functional state like that) where the store cannot be obtained.. the lines below check that we can stil retrieve the keys from the other replica
@@ -64,8 +64,9 @@ public void before() { | |||
|
|||
theStore = new CompositeReadOnlyKeyValueStore<>( | |||
new WrappingStoreProvider(Arrays.<StateStoreProvider>asList(stubProviderOne, stubProviderTwo)), | |||
QueryableStoreTypes.<String, String>keyValueStore(), | |||
storeName); | |||
QueryableStoreTypes.<String, String>keyValueStore(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the kafka way to align this with the (
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not fixed in the style guide. The only thing is says is either to put all params on a single line or put each param on a separate line.
What I've seen (which I think makes sense) is, if the line's not too long, and it doesn't hinder readability, you can either do this:
myMethod(one
two,
three);
or
myMethod(
one,
two,
three
);
Experience says that either one might be more readable in different circumstances, so it makes sense to allow both. They're both equally obvious regarding where all the params are and where the method call ends. The latter one uses more vertical space for simple method calls, but is more friendly when some of the arguments are themselves method calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. I was more curious about this, than anything.. Personally, the latter is very natural and easy to do. I just go with the flow, in these things. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I also prefer the latter. Thanks for asking! It's way easier to review code that already conforms to the norm, whether or not it's required by the style guide.
@@ -509,6 +509,94 @@ public void queryOnRebalance() throws Exception { | |||
} | |||
} | |||
|
|||
@Test | |||
public void shouldBeAbleQueryStandbyStateDuringRebalance() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tthese verifyAllKVKeys
methods will call final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore());
, which should fail by default on the standby? (given thats changed from original PR)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth having two tests for both modes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's correct. The test does fail on this version of the code. Nice catch!
The test for active replicas would pass regardless, it's just a question of whether it returns the data during recovery or after the transition to running. But the test you added for standbys would only pass if we allow querying stale state.
I'm not sure it makes sense to add a negative test for standbys, though, since there are already tests ensuring that we throw the appropriate exceptions when using the old method (with a default of no stale stores). WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense... I will take another look at this when I rebase against the lag PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Implements: KIP-535 Co-authored-by: Navinder Pal Singh Brar <[email protected]> Reviewed-by: John Roesler <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just few comments on interfaces.. This looks much simpler though
* | ||
* @param storeName name of the store to find | ||
* @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)} | ||
* @param includeStaleStores If false, only permit queries on the active replica for a partition, and only if the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cosmetic: extra space at the start
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I noticed that, too. Oh well.
final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType); | ||
if (!globalStore.isEmpty()) { | ||
return queryableStoreType.create(new WrappingStoreProvider(singletonList(globalStoreProvider)), storeName); | ||
return queryableStoreType.create(globalStoreProvider, storeName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thinking aloud: guess there is nt much value in wrapping a single provider.. so +1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the role of the wrapping store provider really is nothing more than just iterating over the list of providers. For global stores, there's always exactly one provider, so it's purely a performance penalty for no gain at all.
/** | ||
* A wrapper over all of the {@link StateStoreProvider}s in a Topology | ||
*/ | ||
public class QueryableStoreProvider { | ||
|
||
private final List<StateStoreProvider> storeProviders; | ||
private final List<StreamThreadStateStoreProvider> storeProviders; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thinking aloud: even though we are binding to a specific StateStoreProvider implementation here, it seems fine,since there are n't any other really in a topology
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of the point of this change, actually, StreamThreadStateStoreProvider
is no longer a StateStoreProvider
. If you need a StateStoreProvider, you have to adapt it with WrappingStoreProvider
.
* Wrapper over StreamThread that implements StateStoreProvider | ||
*/ | ||
public class StreamThreadStateStoreProvider implements StateStoreProvider { | ||
public class StreamThreadStateStoreProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this change, just Global and Wrapping StateStoreProviders exist? IIUC, most of the xxxStore
classes are just accessing the Wrapping..
store provider? Makes me wonder, if we should just use the QueryableStoreProvider
everywhere and cull the interface..
Anyway, I am not familiar enough with this part of the code. So I leave it to you..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also feel that we can collapse the layered interfaces a bit further after we've changed this, since the original motivation of having it is just to "stitch" the global stores and local stores together when exposing as KafkaStreams#stores
.
We can consider that in a separate, cleanup PR afterwards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, StateStoreProvider is used in the public API as part of QueryableStoreType. Its role is well defined already. That's the one that we should continue to use "everywhere".
QueryableStoreProvider
is a utility for selecting either the global store to return OR packaging the thread store providers into a WrappingStoreProvider
and using it to actually create a usable store. It's not a StateStoreProvider
at all.
WrappingStoreProvider
on the other hand is a StateStoreProvider
, and adapts a List<StreamThreadStoreProvider>
to the StateStoreProvider
interface.
GlobalStoreProvider
is also a StateStoreProvider
.
So, if anything, we could increase clarity by renaming some of these classes to more accurately reflect their roles, but all the involved classes have well defined and necessary roles right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang I think we wrote our comments at the same time. Just to respond directly, I do think that we can clean up the code base in a follow-on change (by renaming stuff that's named misleadingly now), but I don't think we can collapse the hierarchy at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! SGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can work on the follow-on change.
WrappingStoreProvider -> WrappingStateStoreProvider
GlobalStoreProvider -> GlobalStateStoreProvider
QueryableStoreProvider is left as is.
@@ -352,7 +347,7 @@ private void verifyAllWindowedKeys(final List<KafkaStreams> streamsList, | |||
final int index = queryMetadata.getActiveHost().port(); | |||
final KafkaStreams streamsWithKey = pickInstanceByPort ? streamsList.get(index) : streams; | |||
final ReadOnlyWindowStore<String, Long> store = | |||
streamsWithKey.store(storeName, QueryableStoreTypes.windowStore()); | |||
streamsWithKey.store(storeName, QueryableStoreTypes.windowStore(), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool. this should now allow standby to be queried
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: in this PR we did not make changes to QueryableStoreType
as discussed in the mailing thread, is that going to be in a different PR?
* Wrapper over StreamThread that implements StateStoreProvider | ||
*/ | ||
public class StreamThreadStateStoreProvider implements StateStoreProvider { | ||
public class StreamThreadStateStoreProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also feel that we can collapse the layered interfaces a bit further after we've changed this, since the original motivation of having it is just to "stitch" the global stores and local stores together when exposing as KafkaStreams#stores
.
We can consider that in a separate, cleanup PR afterwards.
Thanks for the review @vinothchandar ! I think you've pointed out some good opportunities to improve the clarity of the internal implementation, but I'd like to defer that for now, so that we can continue to make progress on #7868 |
@guozhangwang , that is correct. It turned out that there was some abuse of the class hierarchy in the code base, which, once removed, means that we didn't have to change the |
LGTM. Please feel free to merge. |
Thanks, all! The one test failure was in I'm proceeding to merge this. |
Conflicts or compilation errors due to the fact that we temporarily reverted the commit that removes Scala 2.11 support: * AclCommand.scala: take upstream changes. * AclCommandTest.scala: take upstream changes. * TransactionCoordinatorTest.scala: don't use SAMs, but adjust mock call to putTransactionStateIfNotExists given new signature. * TransactionStateManagerTest: use Runnable instead of SAMs. * PartitionLockTest: use Runnable instead of SAMs. * docs/upgrade.html: take upstream changes excluding line that states that Scala 2.11 support has been removed. * apache-github/trunk: (28 commits) KAFKA-9457; Fix flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose (apache#7989) MINOR: Update AclCommand help message to match implementation (apache#7990) MINOR: Update introduction page in Kafka documentation MINOR: Use Math.min for StreamsPartitionAssignor#updateMinReceivedVersion method (apache#7954) KAFKA-9338; Fetch session should cache request leader epoch (apache#7970) KAFKA-9329; KafkaController::replicasAreValid should return error message (apache#7865) KAFKA-9449; Adds support for closing the producer's BufferPool. (apache#7967) MINOR: Handle expandIsr in PartitionLockTest and ensure read threads not blocked on write (apache#7973) MINOR: Fix typo in connect integration test class name (apache#7976) KAFKA-9218: MirrorMaker 2 can fail to create topics (apache#7745) KAFKA-8847; Deprecate and remove usage of supporting classes in kafka.security.auth (apache#7966) MINOR: Suppress DescribeConfigs Denied log during CreateTopics (apache#7971) [MINOR]: Fix typo in Fetcher comment (apache#7934) MINOR: Remove unnecessary call to `super` in `MetricConfig` constructor (apache#7975) MINOR: fix flaky StreamsUpgradeTestIntegrationTest (apache#7974) KAFKA-9431: Expose API in KafkaStreams to fetch all local offset lags (apache#7961) KAFKA-9235; Ensure transaction coordinator is stopped after replica deletion (apache#7963) KAFKA-9410; Make groupId Optional in KafkaConsumer (apache#7943) MINOR: Removed accidental double negation in error message. (apache#7834) KAFKA-6144: IQ option to query standbys (apache#7962) ...
This is based on a temporary branch, which is mirrored from #7960.
I will delete the temporary branch once #7960 is merged and re-target this PR to trunk.
Committer Checklist (excluded from commit message)